import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';

class ForkJoinTestPage extends TestPage {
  ForkJoinTestPage(super.title) {
    test('Rx.forkJoinList', () async {
      final combined = Rx.forkJoinList<int>([
        Stream.fromIterable([1, 2, 3]),
        Stream.value(2),
        Stream.value(3),
      ]);

      expect(
        combined
      );
    });

    test('Rx.forkJoin.nullable', () {
      expect(
        ForkJoinStream.combine2(
          Stream.value(null),
          Stream.value(1),
              (a, b) => '$a $b',
        )
      );
    });

    test('Rx.forkJoin.iterate.once', () async {
      var iterationCount = 0;

      final stream = Rx.forkJoinList<int>(() sync* {
        ++iterationCount;
        yield Stream.value(1);
        yield Stream.value(2);
        yield Stream.value(3);
      }());

      expect(stream);
      expect(iterationCount);
    });

    test('Rx.forkJoin.empty', () {
      expect(Rx.forkJoinList<int>([]));
    });

    test('Rx.forkJoinList.singleStream', () async {
      final combined = Rx.forkJoinList<int>([
        Stream.fromIterable([1, 2, 3])
      ]);

      expect(combined);
    });

    test('Rx.forkJoin', () async {
      final combined = Rx.forkJoin<int, int>(
        [
          Stream.fromIterable([1, 2, 3]),
          Stream.value(2),
          Stream.value(3),
        ],
            (values) => values.fold(0, (acc, val) => acc + val),
      );

      expect(combined);
    });

    test('Rx.forkJoin3', () async {
      final stream = Rx.forkJoin3(streamA, streamB, streamC,
              (int aValue, int bValue, bool cValue) => '$aValue $bValue $cValue');

      expect(stream);
    });

    test('Rx.forkJoin3.single.subscription', () async {
      final stream = Rx.forkJoin3(streamA, streamB, streamC,
              (int aValue, int bValue, bool cValue) => '$aValue $bValue $cValue');

      expect(stream);
      expect(() => stream.listen(null));
    });

    test('Rx.forkJoin2', () async {
      var a = Stream.fromIterable(const [1, 2]), b = Stream.value(2);

      final stream = Rx.forkJoin2(a, b, (int first, int second) => [first, second]);

      expect(stream);
    });

    test('Rx.forkJoin2.throws', () async {
      var a = Stream.value(1), b = Stream.value(2);

      final stream = Rx.forkJoin2(a, b, (int first, int second) {
        throw Exception();
      });

      stream.listen(null, onError:(Object e, StackTrace s) {
        expect(e);
      });
    });

    test('Rx.forkJoin3', () async {
      var a = Stream<int>.value(1),
          b = Stream<String>.value('2'),
          c = Stream<double>.value(3.0);

      final stream = Rx.forkJoin3(a, b, c,
              (int first, String second, double third) => [first, second, third]);

      expect(stream);
    });

    test('Rx.forkJoin4', () async {
      var a = Stream.value(1),
          b = Stream<int>.value(2),
          c = Stream<int>.value(3),
          d = Stream<int>.value(4);

      final stream = Rx.forkJoin4(
          a,
          b,
          c,
          d,
              (int first, int second, int third, int fourth) =>
          [first, second, third, fourth]);

      expect(stream);
    });

    test('Rx.forkJoin5', () async {
      var a = Stream<int>.value(1),
          b = Stream<int>.value(2),
          c = Stream<int>.value(3),
          d = Stream<int>.value(4),
          e = Stream<int>.value(5);

      final stream = Rx.forkJoin5(
          a,
          b,
          c,
          d,
          e,
              (int first, int second, int third, int fourth, int fifth) =>
          <int>[first, second, third, fourth, fifth]);

      expect(stream);
    });

    test('Rx.forkJoin6', () async {
      var a = Stream<int>.value(1),
          b = Stream<int>.value(2),
          c = Stream<int>.value(3),
          d = Stream<int>.value(4),
          e = Stream<int>.value(5),
          f = Stream<int>.value(6);

      final stream = Rx.combineLatest6(
          a,
          b,
          c,
          d,
          e,
          f,
              (int first, int second, int third, int fourth, int fifth, int sixth) =>
          [first, second, third, fourth, fifth, sixth]);

      expect(stream);
    });

    test('Rx.forkJoin7', () async {
      var a = Stream<int>.value(1),
          b = Stream<int>.value(2),
          c = Stream<int>.value(3),
          d = Stream<int>.value(4),
          e = Stream<int>.value(5),
          f = Stream<int>.value(6),
          g = Stream<int>.value(7);

      final stream = Rx.forkJoin7(
          a,
          b,
          c,
          d,
          e,
          f,
          g,
              (int first, int second, int third, int fourth, int fifth, int sixth,
              int seventh) =>
          [first, second, third, fourth, fifth, sixth, seventh]);

      expect(stream);
    });

    test('Rx.forkJoin8', () async {
      var a = Stream<int>.value(1),
          b = Stream<int>.value(2),
          c = Stream<int>.value(3),
          d = Stream<int>.value(4),
          e = Stream<int>.value(5),
          f = Stream<int>.value(6),
          g = Stream<int>.value(7),
          h = Stream<int>.value(8);

      final stream = Rx.forkJoin8(
          a,
          b,
          c,
          d,
          e,
          f,
          g,
          h,
              (int first, int second, int third, int fourth, int fifth, int sixth,
              int seventh, int eighth) =>
          [first, second, third, fourth, fifth, sixth, seventh, eighth]);

      expect(stream);
    });

    test('Rx.forkJoin9', () async {
      var a = Stream<int>.value(1),
          b = Stream<int>.value(2),
          c = Stream<int>.value(3),
          d = Stream<int>.value(4),
          e = Stream<int>.value(5),
          f = Stream<int>.value(6),
          g = Stream<int>.value(7),
          h = Stream<int>.value(8),
          i = Stream<int>.value(9);

      final stream = Rx.forkJoin9(
          a,
          b,
          c,
          d,
          e,
          f,
          g,
          h,
          i,
              (int first, int second, int third, int fourth, int fifth, int sixth,
              int seventh, int eighth, int ninth) =>
          [
            first,
            second,
            third,
            fourth,
            fifth,
            sixth,
            seventh,
            eighth,
            ninth
          ]);

      expect(stream);
    });

    test('Rx.forkJoin.asBroadcastStream', () async {
      final stream = Rx.forkJoin3(streamA, streamB, streamC,
              (int aValue, int bValue, bool cValue) => '$aValue $bValue $cValue')
          .asBroadcastStream();


      stream.listen(null);
      stream.listen(null);

      expect(stream.isBroadcast);
    });

    test('Rx.forkJoin.error.shouldThrowA', () async {
      final streamWithError = Rx.forkJoin4(
          Stream.value(1),
          Stream.value(1),
          Stream.value(1),
          Stream<int>.error(Exception()),
              (int aValue, int bValue, int cValue, dynamic _) =>
          '$aValue $bValue $cValue $_');

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          }, cancelOnError: true);
    });

    test('Rx.forkJoin.error.shouldThrowB', () async {
      final streamWithError =
      Rx.forkJoin3(Stream.value(1), Stream.value(1), Stream.value(1),
              (int aValue, int bValue, int cValue) {
            throw Exception('oh noes!');
          });

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.forkJoin.pause.resume', () async {
      final first = Stream.periodic(const Duration(milliseconds: 10),
              (index) => const [1, 2, 3, 4][index]).take(4),
          second = Stream.periodic(const Duration(milliseconds: 10),
                  (index) => const [5, 6, 7, 8][index]).take(4),
          last = Stream.periodic(const Duration(milliseconds: 10),
                  (index) => const [9, 10, 11, 12][index]).take(4);

      late StreamSubscription<Iterable<num>> subscription;
      subscription =
          Rx.forkJoin3(first, second, last, (int a, int b, int c) => [a, b, c])
              .listen(((value) {
            expect(value.elementAt(0));
            expect(value.elementAt(1));
            expect(value.elementAt(2));

            subscription.cancel();
          }));

      subscription.pause(Future<void>.delayed(const Duration(milliseconds: 80)));
    });

    test('Rx.forkJoin.completed', () async {
      final stream = Rx.forkJoin2(
        Stream<int>.empty(),
        Stream.value(1),
            (int a, int b) => a + b,
      );
      expect(stream);
    });

    test('Rx.forkJoin.error.shouldThrowC', () async {
      final stream = Rx.forkJoin2(
        Stream.value(1),
        Stream<int>.error(Exception()).concatWith([
          Rx.timer(
            2,
            const Duration(milliseconds: 100),
          )
        ]),
            (int a, int b) => a + b,
      );
      expect(stream);
    });

    test('Rx.forkJoin.error.shouldThrowD', () async {
      final stream = Rx.forkJoin2(
        Stream.value(1),
        Stream<int>.error(Exception()).concatWith([
          Rx.timer(
            2,
            const Duration(milliseconds: 100),
          )
        ]),
            (int a, int b) => a + b,
      );

      stream.listen(
        (value) {},
        onError: (Object e, StackTrace s){
              (Object e, StackTrace s) => expect(e);},
        cancelOnError: true,
      );
    });
  }

  Stream<int> get streamA =>
      Stream<int>.periodic(const Duration(milliseconds: 1), (int count) => count)
          .take(3);

  Stream<int> get streamB => Stream<int>.fromIterable(const <int>[1, 2, 3, 4]);

  Stream<bool> get streamC {
    final controller = StreamController<bool>()
      ..add(true)
      ..close();

    return controller.stream;
  }

}